package com.alibaba.datax.plugin.reader.excelreader;


import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.unstructuredstorage.reader.ColumnEntry;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderErrorCode;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.collect.Sets;
import com.monitorjbl.xlsx.StreamingReader;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.poi.ss.usermodel.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.nio.charset.UnsupportedCharsetException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Pattern;

/****************************************
 *
 *@Author: wangzp
 *@Date: 2023/7/24 周一 14:27
 *@Desc:
 *
 *****************************************/

public class ExcelReader extends Reader {
    public static class Job extends Reader.Job {
        private static final Logger LOG = LoggerFactory.getLogger(Job.class);

        private Configuration originConfig = null;

        private List<String> path = null;

        private List<String> sourceFiles;

        private Map<String, Pattern> pattern;

        private Map<String, Boolean> isRegexPath;

        @Override
        public void init() {
            this.originConfig = this.getPluginJobConf();
            this.pattern = new HashMap<String, Pattern>();
            this.isRegexPath = new HashMap<String, Boolean>();
            this.validateParameter();
        }

        private void validateParameter() {
            // Compatible with the old version, path is a string before
            String pathInString = this.originConfig.getNecessaryValue(Key.PATH,
                    ExcelReaderErrorCode.REQUIRED_VALUE);
            if (StringUtils.isBlank(pathInString)) {
                throw DataXException.asDataXException(
                        ExcelReaderErrorCode.REQUIRED_VALUE,
                        "您需要指定待读取的源目录或文件");
            }
            if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) {
                path = new ArrayList<String>();
                path.add(pathInString);
            } else {
                path = this.originConfig.getList(Key.PATH, String.class);
                if (null == path || path.size() == 0) {
                    throw DataXException.asDataXException(
                            ExcelReaderErrorCode.REQUIRED_VALUE,
                            "您需要指定待读取的源目录或文件");
                }
            }

            String encoding = this.originConfig
                    .getString(
                            com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING,
                            com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_ENCODING);
            if (StringUtils.isBlank(encoding)) {
                this.originConfig
                        .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING,
                                com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_ENCODING);
            } else {
                try {
                    encoding = encoding.trim();
                    this.originConfig
                            .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING,
                                    encoding);
                    Charsets.toCharset(encoding);
                } catch (UnsupportedCharsetException uce) {
                    throw DataXException.asDataXException(
                            ExcelReaderErrorCode.ILLEGAL_VALUE,
                            String.format("不支持您配置的编码格式 : [%s]", encoding), uce);
                } catch (Exception e) {
                    throw DataXException.asDataXException(
                            ExcelReaderErrorCode.CONFIG_INVALID_EXCEPTION,
                            String.format("编码配置异常, 请联系我们: %s", e.getMessage()),
                            e);
                }
            }

            // column: 1. index type 2.value type 3.when type is Date, may have
            // format
            List<Configuration> columns = this.originConfig
                    .getListConfiguration(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
            // handle ["*"]
            if (null != columns && 1 == columns.size()) {
                String columnsInStr = columns.get(0).toString();
                if ("\"*\"".equals(columnsInStr) || "'*'".equals(columnsInStr)) {
                    this.originConfig
                            .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN,
                                    null);
                    columns = null;
                }
            }

//            if (null != columns && columns.size() != 0) {
//                for (Configuration eachColumnConf : columns) {
//                    eachColumnConf
//                            .getNecessaryValue(
//                                    com.alibaba.datax.plugin.unstructuredstorage.reader.Key.TYPE,
//                                    ExcelReaderErrorCode.REQUIRED_VALUE);
//                    eachColumnConf
//                            .getNecessaryValue(
//                                    com.alibaba.datax.plugin.unstructuredstorage.reader.Key.TYPE,
//                                    ExcelReaderErrorCode.REQUIRED_VALUE);
//                    Integer columnIndex = eachColumnConf
//                            .getInt(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.INDEX);
//                    String columnValue = eachColumnConf
//                            .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.VALUE);
//
//                    if (null == columnIndex && null == columnValue) {
//                        throw DataXException.asDataXException(
//                                ExcelReaderErrorCode.NO_INDEX_VALUE,
//                                "由于您配置了type, 则至少需要配置 index 或 value");
//                    }
//
//                    if (null != columnIndex && null != columnValue) {
//                        throw DataXException.asDataXException(
//                                ExcelReaderErrorCode.MIXED_INDEX_VALUE,
//                                "您混合配置了index, value, 每一列同时仅能选择其中一种");
//                    }
//                    if (null != columnIndex && columnIndex < 0) {
//                        throw DataXException.asDataXException(
//                                ExcelReaderErrorCode.ILLEGAL_VALUE, String
//                                        .format("index需要大于等于0, 您配置的index为[%s]",
//                                                columnIndex));
//                    }
//                }
//            }

            // only support compress types
            String compress = this.originConfig
                    .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS);
            if (StringUtils.isBlank(compress)) {
                this.originConfig
                        .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS,
                                null);
            } else {
                Set<String> supportedCompress = Sets
                        .newHashSet("gzip", "bzip2", "zip");
                compress = compress.toLowerCase().trim();
                if (!supportedCompress.contains(compress)) {
                    throw DataXException
                            .asDataXException(
                                    ExcelReaderErrorCode.ILLEGAL_VALUE,
                                    String.format(
                                            "仅支持 gzip, bzip2, zip 文件压缩格式 , 不支持您配置的文件压缩格式: [%s]",
                                            compress));
                }
                this.originConfig
                        .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS,
                                compress);
            }

            String delimiterInStr = this.originConfig
                    .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.FIELD_DELIMITER);
            // warn: if have, length must be one
            if (null != delimiterInStr && 1 != delimiterInStr.length()) {
                throw DataXException.asDataXException(
                        UnstructuredStorageReaderErrorCode.ILLEGAL_VALUE,
                        String.format("仅仅支持单字符切分, 您配置的切分为 : [%s]",
                                delimiterInStr));
            }

        }

        @Override
        public void prepare() {
            LOG.debug("prepare() begin...");
            // warn:make sure this regex string
            // warn:no need trim
            for (String eachPath : this.path) {
                String regexString = eachPath.replace("*", ".*").replace("?",
                        ".?");
                Pattern patt = Pattern.compile(regexString);
                this.pattern.put(eachPath, patt);
                this.sourceFiles = this.buildSourceTargets();
            }

            LOG.info(String.format("您即将读取的文件数为: [%s]", this.sourceFiles.size()));
        }

        @Override
        public void post() {
        }

        @Override
        public void destroy() {
        }

        // warn: 如果源目录为空会报错，拖空目录意图=>空文件显示指定此意图
        @Override
        public List<Configuration> split(int adviceNumber) {
            LOG.debug("split() begin...");
            List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();

            // warn:每个slice拖且仅拖一个文件,
            // int splitNumber = adviceNumber;
            int splitNumber = this.sourceFiles.size();
            if (0 == splitNumber) {
                throw DataXException.asDataXException(
                        ExcelReaderErrorCode.EMPTY_DIR_EXCEPTION, String
                                .format("未能找到待读取的文件,请确认您的配置项path: %s",
                                        this.originConfig.getString(Key.PATH)));
            }

            List<List<String>> splitedSourceFiles = this.splitSourceFiles(
                    this.sourceFiles, splitNumber);
            for (List<String> files : splitedSourceFiles) {
                Configuration splitedConfig = this.originConfig.clone();
                splitedConfig.set(Constant.SOURCE_FILES, files);
                readerSplitConfigs.add(splitedConfig);
            }
            LOG.debug("split() ok and end...");
            return readerSplitConfigs;
        }

        // validate the path, path must be a absolute path
        private List<String> buildSourceTargets() {
            // for eath path
            Set<String> toBeReadFiles = new HashSet<String>();
            for (String eachPath : this.path) {
                int endMark;
                for (endMark = 0; endMark < eachPath.length(); endMark++) {
                    if ('*' != eachPath.charAt(endMark)
                            && '?' != eachPath.charAt(endMark)) {
                        continue;
                    } else {
                        this.isRegexPath.put(eachPath, true);
                        break;
                    }
                }

                String parentDirectory;
                if (BooleanUtils.isTrue(this.isRegexPath.get(eachPath))) {
                    int lastDirSeparator = eachPath.substring(0, endMark)
                            .lastIndexOf(IOUtils.DIR_SEPARATOR);
                    parentDirectory = eachPath.substring(0,
                            lastDirSeparator + 1);
                } else {
                    this.isRegexPath.put(eachPath, false);
                    parentDirectory = eachPath;
                }
                this.buildSourceTargetsEathPath(eachPath, parentDirectory,
                        toBeReadFiles);
            }
            return Arrays.asList(toBeReadFiles.toArray(new String[0]));
        }

        private void buildSourceTargetsEathPath(String regexPath,
                                                String parentDirectory, Set<String> toBeReadFiles) {
            // 检测目录是否存在，错误情况更明确
            try {
                File dir = new File(parentDirectory);
                boolean isExists = dir.exists();
                if (!isExists) {
                    String message = String.format("您设定的目录不存在 : [%s]",
                            parentDirectory);
                    LOG.error(message);
                    throw DataXException.asDataXException(
                            ExcelReaderErrorCode.FILE_NOT_EXISTS, message);
                }
            } catch (SecurityException se) {
                String message = String.format("您没有权限查看目录 : [%s]",
                        parentDirectory);
                LOG.error(message);
                throw DataXException.asDataXException(
                        ExcelReaderErrorCode.SECURITY_NOT_ENOUGH, message);
            }

            directoryRover(regexPath, parentDirectory, toBeReadFiles);
        }

        private void directoryRover(String regexPath, String parentDirectory,
                                    Set<String> toBeReadFiles) {
            File directory = new File(parentDirectory);
            // is a normal file
            if (!directory.isDirectory()) {
                if (this.isTargetFile(regexPath, directory.getAbsolutePath())) {
                    toBeReadFiles.add(parentDirectory);
                    LOG.info(String.format(
                            "add file [%s] as a candidate to be read.",
                            parentDirectory));

                }
            } else {
                // 是目录
                try {
                    // warn:对于没有权限的目录,listFiles 返回null，而不是抛出SecurityException
                    File[] files = directory.listFiles();
                    if (null != files) {
                        for (File subFileNames : files) {
                            directoryRover(regexPath,
                                    subFileNames.getAbsolutePath(),
                                    toBeReadFiles);
                        }
                    } else {
                        // warn: 对于没有权限的文件，是直接throw DataXException
                        String message = String.format("您没有权限查看目录 : [%s]",
                                directory);
                        LOG.error(message);
                        throw DataXException.asDataXException(
                                ExcelReaderErrorCode.SECURITY_NOT_ENOUGH,
                                message);
                    }

                } catch (SecurityException e) {
                    String message = String.format("您没有权限查看目录 : [%s]",
                            directory);
                    LOG.error(message);
                    throw DataXException.asDataXException(
                            ExcelReaderErrorCode.SECURITY_NOT_ENOUGH,
                            message, e);
                }
            }
        }

        // 正则过滤
        private boolean isTargetFile(String regexPath, String absoluteFilePath) {
            if (this.isRegexPath.get(regexPath)) {
                return this.pattern.get(regexPath).matcher(absoluteFilePath)
                        .matches();
            } else {
                return true;
            }

        }

        private <T> List<List<T>> splitSourceFiles(final List<T> sourceList,
                                                   int adviceNumber) {
            List<List<T>> splitedList = new ArrayList<List<T>>();
            int averageLength = sourceList.size() / adviceNumber;
            averageLength = averageLength == 0 ? 1 : averageLength;

            for (int begin = 0, end = 0; begin < sourceList.size(); begin = end) {
                end = begin + averageLength;
                if (end > sourceList.size()) {
                    end = sourceList.size();
                }
                splitedList.add(sourceList.subList(begin, end));
            }
            return splitedList;
        }

    }

    public static class Task extends Reader.Task {
        private static Logger LOG = LoggerFactory.getLogger(Task.class);

        private Configuration readerSliceConfig;
        private List<String> sourceFiles;

        @Override
        public void init() {
            this.readerSliceConfig = this.getPluginJobConf();
            this.sourceFiles = this.readerSliceConfig.getList(
                    Constant.SOURCE_FILES, String.class);
        }

        @Override
        public void prepare() {

        }

        @Override
        public void post() {

        }

        @Override
        public void destroy() {

        }

        @Override
        public void startRead(RecordSender recordSender) {
            LOG.debug("start read source files...");
            for (String fileName : this.sourceFiles) {
                LOG.info(String.format("reading file11111 : [%s]", fileName));
                InputStream inputStream;
                try {
                    String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
                    inputStream = new FileInputStream(fileName);
//					while ((parseRows = UnstructuredStorageReaderUtil
//							.splitBufferedReader(csvReader)) != null) {
//						UnstructuredStorageReaderUtil.transportOneRecord(recordSender,
//								column, parseRows, nullFormat, taskPluginCollector);
//					}
//                    List<JSONObject> lists = readerSliceConfig.getList(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN, JSONObject.class);

                    int headerRow = readerSliceConfig.getInt(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.SKIP_HEADER,0);

//                    List<ColumnEntry> result = new ArrayList<ColumnEntry>();
//                    if (null != lists){
//                        JSONObject jsonObject = lists.get(0);
//                        LOG.info(String.format("cccc" + jsonObject.toJSONString()));
//                        for (final JSONObject object : lists) {
//                            LOG.info(String.format("AAAA" + object.toJSONString()));
//                            result.add(JSON.parseObject(object.toJSONString(),
//                                    ColumnEntry.class));
//                        }
//                    }
                    List<String> column = readerSliceConfig.getList(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN,String.class);
//                    List<ColumnEntry> column = UnstructuredStorageReaderUtil
//                            .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
                    Workbook wk = StreamingReader.builder()
                            .rowCacheSize(100)  //缓存到内存中的行数，默认是10
                            .bufferSize(4096)  //读取资源时，缓存到内存的字节大小，默认是1024
                            .open(inputStream);  //打开资源，必须，可以是InputStream或者是File，注意：只能打开XLSX格式的文件
                    int sheetNums = wk.getNumberOfSheets();

                    for (int i = 0; i < sheetNums; i++) {
                        List<String> sheetData = new ArrayList<String>();
                        Sheet sheet = wk.getSheetAt(i);
                        String sheetName = wk.getSheetName(i);

                        List<String> headerColumns = new ArrayList<>();


                        //遍历所有的行
                        int k = 0;
                        for (Row row : sheet) {
                            if(row.getRowNum() < headerRow){
                                continue;
                            }

                            if (row.getRowNum()== headerRow ){
                                for (Cell cell : row) {
                                    headerColumns.add(cell.getStringCellValue());
                                }

                            }else {
                                StringBuilder sb = new StringBuilder();
                                //遍历所有的列
                                ArrayList<Cell> list = new ArrayList<>();
                                for (Cell cell : row) {
                                    sb.append(cell.getStringCellValue());
                                    list.add(cell);
                                }
                                transportOneRecord(recordSender, column,headerColumns, list, nullFormat, this.getTaskPluginCollector());
                                LOG.info("===========" + sb.toString());
                            }

                        }
                    }
                    recordSender.flush();
                } catch (FileNotFoundException e) {
                    String message = String
                            .format("找不到待读取的文件 : [%s]", fileName);
                    LOG.error(message);
                    throw DataXException.asDataXException(
                            ExcelReaderErrorCode.OPEN_FILE_ERROR, message);
                }
            }
        }

        private enum Type {
            STRING, LONG, BOOLEAN, DOUBLE,NUMERIC, DATE,
        }

        public static Record transportOneRecord(RecordSender recordSender,
                                                List<String> columnConfigs,List<String> headerColumns,List<Cell> sourceLine,
                                                String nullFormat, TaskPluginCollector taskPluginCollector) {
            Record record = recordSender.createRecord();
            Column columnGenerated = null;

            // 创建都为String类型column的record
            if (null == columnConfigs || columnConfigs.size() == 0) {
                for (Cell cell : sourceLine) {
                    // not equalsIgnoreCase, it's all ok if nullFormat is null
                    if (cell.getStringCellValue().equals(nullFormat)) {
                        columnGenerated = new StringColumn(null);
                    } else {
                        columnGenerated = new StringColumn(cell.getStringCellValue());
                    }
                    record.addColumn(columnGenerated);
                }
                recordSender.sendToWriter(record);
            } else {
                try {
                    for (String columnName : columnConfigs) {

                        String columnValue = null;

                        if ( null == columnName) {
                            throw DataXException
                                    .asDataXException(
                                            UnstructuredStorageReaderErrorCode.NO_INDEX_VALUE,
                                            "需要配置 name 列");
                        }

                        int columnIndex = headerColumns.indexOf(columnName);
                        if (columnIndex >= sourceLine.size()) {
                            String message = String
                                    .format("您尝试读取的列越界,源文件该行有 [%s] 列,您尝试读取第 [%s] 列, 数据详情[%s]",
                                            sourceLine.size(), columnIndex + 1,
                                            StringUtils.join(sourceLine, ","));
                            LOG.warn(message);
                            throw new IndexOutOfBoundsException(message);
                        }
                        Cell cell = sourceLine.get(columnIndex);
                        columnValue = cell.getStringCellValue();
                        String columnType = cell.getCellType().name();
                        Type type = Type.valueOf(columnType.toUpperCase());
                        // it's all ok if nullFormat is null
                        if (columnValue.equals(nullFormat)) {
                            columnValue = null;
                        }
                        switch (type) {
                            case STRING:
                                columnGenerated = new StringColumn(columnValue);
                                break;
                            case LONG:
                                try {
                                    columnGenerated = new LongColumn(columnValue);
                                } catch (Exception e) {
                                    throw new IllegalArgumentException(String.format(
                                            "类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
                                            "LONG"));
                                }
                                break;
                            case NUMERIC:
                                try {
                                    short format = cell.getCellStyle().getDataFormat();
                                    if (DateUtil.isCellDateFormatted(cell)) {
                                        SimpleDateFormat sdf = null;
                                        if (format == 20 || format == 32) {
                                            sdf = new SimpleDateFormat("HH:mm");
                                        } else if (format == 14 || format == 31 || format == 57 || format == 58) {
                                            // 处理自定义日期格式：m月d日(通过判断单元格的格式id解决，id的值是58)
                                            sdf = new SimpleDateFormat("yyyy-MM-dd");
                                            double value = cell.getNumericCellValue();
                                            Date date = org.apache.poi.ss.usermodel.DateUtil
                                                    .getJavaDate(value);
                                            columnValue = sdf.format(date);
                                        }else {// 日期
                                            sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        }
                                        try {
                                            columnValue = sdf.format(cell.getDateCellValue());// 日期
                                        } catch (Exception e) {
                                            try {
                                                throw new Exception("exception on get date data !".concat(e.toString()));
                                            } catch (Exception e1) {
                                                e1.printStackTrace();
                                            }
                                        }finally{
                                            sdf = null;
                                        }
                                        columnGenerated = new StringColumn(columnValue);
                                    }  else {
                                        columnGenerated = new DoubleColumn(columnValue);
                                    }
                                } catch (Exception e) {
                                    throw new IllegalArgumentException(String.format(
                                            "类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
                                            "DOUBLE"));
                                }
                                break;
                            case BOOLEAN:
                                try {
                                    columnGenerated = new BoolColumn(columnValue);
                                } catch (Exception e) {
                                    throw new IllegalArgumentException(String.format(
                                            "类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
                                            "BOOLEAN"));
                                }

                                break;
                            case DATE:
                                try {
                                    if (columnValue == null) {
                                        Date date = null;
                                        columnGenerated = new DateColumn(date);
                                    } else {
//                                        String formatString = columnConfig.getFormat();
//                                        //if (null != formatString) {
//                                        if (StringUtils.isNotBlank(formatString)) {
//                                            // 用户自己配置的格式转换, 脏数据行为出现变化
//                                            DateFormat format = columnConfig
//                                                    .getDateFormat();
//                                            columnGenerated = new DateColumn(
//                                                    format.parse(columnValue));
//                                        } else {
//                                            // 框架尝试转换
//                                            columnGenerated = new DateColumn(
//                                                    new StringColumn(columnValue)
//                                                            .asDate());
//                                        }
                                        columnGenerated = new DateColumn(
                                                new StringColumn(columnValue)
                                                        .asDate());
                                    }
                                } catch (Exception e) {
                                    throw new IllegalArgumentException(String.format(
                                            "类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
                                            "DATE"));
                                }
                                break;
                            default:
                                String errorMessage = String.format(
                                        "您配置的列类型暂不支持 : [%s]", columnType);
                                LOG.error(errorMessage);
                                throw DataXException
                                        .asDataXException(
                                                UnstructuredStorageReaderErrorCode.NOT_SUPPORT_TYPE,
                                                errorMessage);
                        }

                        record.addColumn(columnGenerated);

                    }
                    recordSender.sendToWriter(record);
                } catch (IllegalArgumentException iae) {
                    taskPluginCollector
                            .collectDirtyRecord(record, iae.getMessage());
                } catch (IndexOutOfBoundsException ioe) {
                    taskPluginCollector
                            .collectDirtyRecord(record, ioe.getMessage());
                } catch (Exception e) {
                    if (e instanceof DataXException) {
                        throw (DataXException) e;
                    }
                    // 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式
                    taskPluginCollector.collectDirtyRecord(record, e.getMessage());
                }
            }

            return record;
        }

    }

}
